package com.atguigu.flink.java.chapter_7.state;

import com.atguigu.flink.java.chapter_5.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;


/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/1/2 11:51
 */
public class Flink02_State_Keyed_AggregateState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment
          .getExecutionEnvironment()
          .setParallelism(3);
        env
          .socketTextStream("hadoop102", 9999)
          .map(value -> {
              String[] datas = value.split(",");
              return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

          })
          .keyBy(WaterSensor::getId)
          .process(new KeyedProcessFunction<String, WaterSensor, Double>() {

              private AggregatingState<Integer, Double> avgState;

              @Override
              public void open(Configuration parameters) throws Exception {
                  AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double> aggregatingStateDescriptor = new AggregatingStateDescriptor<>("avgState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                      @Override
                      public Tuple2<Integer, Integer> createAccumulator() {
                          return Tuple2.of(0, 0);
                      }

                      @Override
                      public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                          return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                      }

                      @Override
                      public Double getResult(Tuple2<Integer, Integer> accumulator) {
                          return accumulator.f0 * 1D / accumulator.f1;
                      }

                      @Override
                      public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                          return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                      }
                  }, Types.TUPLE(Types.INT, Types.INT));
                  avgState = getRuntimeContext().getAggregatingState(aggregatingStateDescriptor);
              }

              @Override
              public void processElement(WaterSensor value, Context ctx, Collector<Double> out) throws Exception {
                  avgState.add(value.getVc());
                  out.collect(avgState.get());
              }
          })
          .print();
        env.execute();
    }
}
